 1.源码阅读准备
   
   1).下载Apache Hadoop-2.9.2官方源码
   2).将源码导入idea中
   启动idea在提示界面选择导入
   点击import Project
   找到解压后的hadoop-2.9.2-src路径 点击ok
   选择导入maven工程
   选入Search for projects recursively 和 Import Maven projects actomatically
   点击next选择src 点击next选择 org.apache.hadoop:hadoop-main:2.9.2 点击next
   在Project name:hadoop-main 点击finish
   等待下载和解决依赖完成，源码导入成功！！
 
 2.NameNode启动流程
   
   命令启动Hdfs集群 start-dfs.sh
   该命令会启动Hdfs的NameNode以及DataNode，启动NameNode主要是通过org.apache.hadoop.hdfs.server.namenode.NameNode类。
   我们重点关注NameNode在启动过程中做了哪些工作（偏离主线的技术细节不深究）
   对于分析启动流程主要关注两部分代码：
   public class NameNode extends ReconfigurableBase implements
  NameNodeStatusMXBean {
  //该静态代码块主要是初始化一些HDFS的配置信息
 static{
  HdfsConfiguration.init();//进入之后发现方法是空的，没有任何操作？其实不是观察
HdfsConfiguration的静态代码块
}
   //HdfsConfiguration的类以及静态代码块
  public class HdfsConfiguration extends Configuration {
 static {
  addDeprecatedKeys();
  // adds the default resources
  Configuration.addDefaultResource("hdfs-default.xml");
  Configuration.addDefaultResource("hdfs-site.xml");
  
}
  
  //main方法 
 public static void main(String argv[]) throws Exception{
  //分析传入的参数是否为帮助参数，如果是帮助的话打印帮助信息，并退出。
  if(DFSUtil.parseHelpArgument(argv,NameNode.USAGE,System.out,true)){
    System.exit(0);
 }
  try{
    //格式化输出启动信息，并且创建hook（打印节点关闭信息）
    StringUtils.startupShutdownMessage(NameNode.class,argv,LOG);
    //创建namenode
    NameNode namenode=createNameNode(argv,null);
    if（namenode!=null){
      //加入集群
      namenode.join()
   }
 }catch(Throwable e){
    //异常处理
    LOG.error("Failed to start namenode.",e)
    terminate(1,e);
 }
} 
  //关注createNameNode
  public static NameNode createNameNode(String argv[], Configuration conf)
   throws IOException {
  LOG.info("createNameNode " + Arrays.asList(argv));
  if (conf == null)
   conf = new HdfsConfiguration();
  // Parse out some generic args into Configuration.
  GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
  argv = hParser.getRemainingArgs();
   // Parse the rest, NN specific args.
    //解析启动的参数
  StartupOption startOpt = parseArguments(argv);
  if (startOpt == null) {
   printUsage(System.err);
   return null;
 }
  setStartupOption(conf, startOpt);

  switch (startOpt) {
  ....
   default: { //正常启动进入该分支
     //初始化metric系统
    DefaultMetricsSystem.initialize("NameNode");
     //返回新的NameNode
    return new NameNode(conf);
  }
 }
}
 ----------------------------------------------------------------------
  //NameNode的构造  
 public NameNode(Configuration conf) throws IOException {
  this(conf, NamenodeRole.NAMENODE);
}
 
 ...
 protected NameNode(Configuration conf, NamenodeRole role)
   throws IOException {
  this.conf = conf;
  this.role = role;
  // 设置NameNode#clientNamenodeAddress为"hdfs://localhost:9000"
  setClientNamenodeAddress(conf);
  String nsId = getNameServiceId(conf);
  String namenodeId = HAUtil.getNameNodeId(conf, nsId);
  // HA相关
  this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
  state = createHAState(getStartupOption(conf));
  this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
  this.haContext = createHAContext();
  try {
   initializeGenericKeys(conf, nsId, namenodeId);
   // 完成实际的初始化工作
   initialize(conf);
   // HA相关
   try {
    haContext.writeLock();
    state.prepareToEnterState(haContext);
    state.enterState(haContext);
  } finally {
    haContext.writeUnlock();
  }
 } catch (IOException e) {
   this.stop();
   throw e;
 } catch (HadoopIllegalArgumentException e) {
   this.stop();
   throw e;
 }
}
  //尽管本地没有开启HA（haEnabled=false**），**namenode依然拥有一个HAState，namenode的HAState状态为active.
----------------------------------------------------------------------
  // 完成实际的初始化工作
  // initialize(conf);
 protected void initialize(Configuration conf) throws IOException {
  if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
   String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
   if (intervals != null) {
    conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
     intervals);
  }
 }
  
  UserGroupInformation.setConfiguration(conf);
  loginAsNameNodeUser(conf);
  // 初始化metric
  NameNode.initMetrics(conf, this.getRole());
  StartupProgressMetrics.register(startupProgress);
  // 启动httpServer
  if (NamenodeRole.NAMENODE == role) {
	startHttpServer(conf);
}
  this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
  // 从namenode目录加载fsimage与editlog，初始化FsNamesystem、FsDirectory、LeaseManager等
  loadNamesystem(conf);
  // 创建RpcServer，封装了NameNodeRpcServer clientRpcServer，支持
  //ClientNamenodeProtocol、DatanodeProtocolPB等协议
  rpcServer = createRpcServer(conf);
  if (clientNamenodeAddress == null) {
   // This is expected for MiniDFSCluster. Set it now using
   // the RPC server's bind address.
   clientNamenodeAddress =
     NetUtils.getHostPortString(rpcServer.getRpcAddress());
   LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
     + " this namenode/service.");
 }
  if (NamenodeRole.NAMENODE == role) {
   httpServer.setNameNodeAddress(getNameNodeAddress());
   httpServer.setFSImage(getFSImage());
 }
 
  // 启动JvmPauseMonitor等，反向监控JVM
  pauseMonitor = new JvmPauseMonitor(conf);
  pauseMonitor.start();
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
  // 启动执行多个非常重要工作的多个线程
  startCommonServices(conf);
}
---------------------------------------------------------------------- 

private void startCommonServices(Configuration conf) throws IOException {
  // 创建NameNodeResourceChecker、激活BlockManager等
  namesystem.startCommonServices(conf, haContext);
  registerNNSMXBean();
  // 角色非`NamenodeRole.NAMENODE`的在此处启动HttpServer
  if (NamenodeRole.NAMENODE != role) {
   startHttpServer(conf);
   httpServer.setNameNodeAddress(getNameNodeAddress());
   httpServer.setFSImage(getFSImage());
 }
// 启动RPCServer
  rpcServer.start();
 ...// 启动各插件
  LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
  if (rpcServer.getServiceRpcAddress() != null) {
   LOG.info(getRole() + " service RPC up at: "
     + rpcServer.getServiceRpcAddress());
 --------------------------------------------------------------------------------
 
 void startCommonServices(Configuration conf, HAContext haContext) throws
IOException {
  this.registerMBean(); // register the MBean for the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
   // 创建NameNodeResourceChecker，并立即检查一次
   nnResourceChecker = new NameNodeResourceChecker(conf);
   checkAvailableResources();
   assert safeMode != null && !isPopulatingReplQueues();
   // 设置一些启动过程中的信息
   StartupProgress prog = NameNode.getStartupProgress();
   prog.beginPhase(Phase.SAFEMODE);
   prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
    getCompleteBlocksTotal());
   // 设置已完成的数据块总量
   setBlockTotal();
   // 激活BlockManager
   blockManager.activate(conf);
 } finally {
   writeUnlock();
 }
  
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  snapshotManager.registerMXBean();
}
  
  //blockManager.activate(conf)激活BlockManager主要完成PendingReplicationMonitor、
  //DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitor
  
  public void activate(Configuration conf) {
  // 启动PendingReplicationMonitor
  pendingReplications.start();
  // 激活DatanodeManager：启动DecommissionManager--Monitor、HeartbeatManager--Monitor
  datanodeManager.activate(conf);
  // 启动BlockManager--ReplicationMonitor
  this.replicationThread.start();
}
 
   namenode的主要责任是文件元信息与数据块映射的管理。相应的，namenode的启动流程需要关注与客户端、datanode通信
的工作线程，文件元信息的管理机制，数据块的管理机制等。
   其中，RpcServer主要负责与客户端、datanode通信，FSDirectory主要负责管理文件元信息。
 
 3.DataNode 启动流程
   
   datanode的Main Class是DataNode，先找到DataNode.main()
   
   public class DataNode extends ReconfigurableBase
  implements InterDatanodeProtocol, ClientDatanodeProtocol,
    TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
 public static final Logger LOG = LoggerFactory.getLogger(DataNode.class);
  
   static{
  HdfsConfiguration.init();
 }
   public static void main(String args[]) {
 if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
  System.exit(0);
}
 secureMain(args, null);
}
...
  
  public static void secureMain(String args[], SecureResources resources) {
 int errorCode = 0;
 try {
  // 打印启动信息
  StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
  // 完成创建datanode的主要工作
  DataNode datanode = createDataNode(args, null, resources);
  if (datanode != null) {
   datanode.join();
 } else {
   errorCode = 1;
 }
} catch (Throwable e) {
  LOG.fatal("Exception in secureMain", e);
  terminate(1, e);
} finally {
  LOG.warn("Exiting Datanode");
  terminate(errorCode);
}
}
  
  ---------------------------------------------------
  public static DataNode createDataNode(String args[], Configuration conf,
  SecureResources resources) throws IOException {
 // 完成大部分初始化的工作，并启动部分工作线程
 DataNode dn = instantiateDataNode(args, conf, resources);
 if (dn != null) {
  // 启动剩余工作线程
  dn.runDatanodeDaemon();
}
 return dn;
}
  
  --------------------------------------------------
  /** Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
public void runDatanodeDaemon() throws IOException {
 // 在DataNode.instantiateDataNode()执行过程中会调用该方法（见后）
 blockPoolManager.startAll();
 dataXceiverServer.start();
 if (localDataXceiverServer != null) {
  localDataXceiverServer.start();
}
 ipcServer.start();
 startPlugins(conf);
}  
--------------------------------------------------------
  public static DataNode instantiateDataNode(String args [], Configuration
conf,
  SecureResources resources) throws IOException {
 if (conf == null)
	conf = new HdfsConfiguration();
... // 参数检查等

 Collection<StorageLocation> dataLocations = getStorageLocations(conf);
 UserGroupInformation.setConfiguration(conf);
 SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
   DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
 return makeInstance(dataLocations, conf, resources);
}
--------------------------------------------------------------------

//DataNode.makeInstance()开始创建DataNode
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
  Configuration conf, SecureResources resources) throws IOException {
...// 检查数据目录的权限
 assert locations.size() > 0 : "number of data directories should be > 0";
 return new DataNode(conf, locations, resources);
}
...

DataNode(final Configuration conf,
    final List<StorageLocation> dataDirs,
    final SecureResources resources) throws IOException {
 super(conf);
...// 参数设置
 try {
  hostName = getHostName(conf);
  LOG.info("Configured hostname is " + hostName);
  startDataNode(conf, dataDirs, resources);
} catch (IOException ie) {
  shutdown();
  throw ie;
}
}
...

  void startDataNode(Configuration conf,
         List<StorageLocation> dataDirs,
         SecureResources resources
         ) throws IOException {
...// 参数设置
 // 初始化DataStorage
 storage = new DataStorage();
 
 // global DN settings
 // 注册JMX
 registerMXBean();
 // 初始化DataXceiver（流式通信），DataNode runDatanodeDaemon()中启动
 initDataXceiver(conf);
 // 启动InfoServer（Web UI）
 startInfoServer(conf);
 // 启动JVMPauseMonitor（反向监控JVM情况，可通过JMX查询）
 pauseMonitor = new JvmPauseMonitor(conf);
 pauseMonitor.start();
...// 略
 // 初始化IpcServer（RPC通信），DataNode-runDatanodeDaemon()中启动
 initIpcServer(conf);

 metrics = DataNodeMetrics.create(conf, getDisplayName());
 metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 // 按照namespace（nameservice）、namenode的结构进行初始化
 blockPoolManager = new BlockPoolManager(this);
 blockPoolManager.refreshNamenodes(conf);
...// 略
}

  //BlockPoolManager抽象了datanode提供的数据块存储服务。BlockPoolManager按照
   //namespace（nameservice）、namenode结构组织。
//BlockPoolManager-refreshNamenodes()
//除了初始化过程主动调用，还可以由namespace通过datanode心跳过程下达刷新命令
void refreshNamenodes(Configuration conf)
  throws IOException {
 LOG.info("Refresh request received for nameservices: " + conf.get
    (DFSConfigKeys.DFS_NAMESERVICES));
 Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
    .getNNServiceRpcAddressesForCluster(conf);
 synchronized (refreshNamenodesLock) {
  doRefreshNamenodes(newAddressMap);
 }
}
  -------------------------------------------------------
  private void doRefreshNamenodes(
  Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
 assert Thread.holdsLock(refreshNamenodesLock);
 Set<String> toRefresh = Sets.newLinkedHashSet();
 Set<String> toAdd = Sets.newLinkedHashSet();
 Set<String> toRemove;
 synchronized (this) {
  // Step 1. For each of the new nameservices, figure out whether
  // it's an update of the set of NNs for an existing NS,
  // or an entirely new nameservice.
  for (String nameserviceId : addrMap.keySet()) {
   if (bpByNameserviceId.containsKey(nameserviceId)) {
    toRefresh.add(nameserviceId);
  } else {
    toAdd.add(nameserviceId);
  }
 }
  ...// 略
 
  // Step 2. Start new nameservices
  if (!toAdd.isEmpty()) {
   LOG.info("Starting BPOfferServices for nameservices: " +
     Joiner.on(",").useForNull("<default>").join(toAdd));
 
   for (String nsToAdd : toAdd) {
    ArrayList<InetSocketAddress> addrs =
     Lists.newArrayList(addrMap.get(nsToAdd).values());
    // 为每个namespace创建对应的BPOfferService
    BPOfferService bpos = createBPOS(addrs);
    bpByNameserviceId.put(nsToAdd, bpos);
    offerServices.add(bpos);
  }
}
  // 然后通过startAll启动所有BPOfferService
  startAll();
}
...// 略
}
------------------------------------------------

protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
 return new BPOfferService(nnAddrs, dn);
}
     
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
 Preconditions.checkArgument(!nnAddrs.isEmpty(),
   "Must pass at least one NN.");
 this.dn = dn;
 for (InetSocketAddress addr : nnAddrs) {
  this.bpServices.add(new BPServiceActor(addr, this));
}
}
  --------------------------------------------
  //BlockPoolManager#startAll()启动所有BPOfferService（实际是启动所有
BPServiceActor）。
  synchronized void startAll() throws IOException {
 try {
  UserGroupInformation.getLoginUser().doAs(
    new PrivilegedExceptionAction<Object>() {
     @Override
     public Object run() throws Exception {
      for (BPOfferService bpos : offerServices) {
       bpos.start();
     }
      return null;
    }
   });
} catch (InterruptedException ex) {
  IOException ioe = new IOException();
  ioe.initCause(ex.getCause());
  throw ioe;
}
}
  
  -------------------------------------------------------
  //在datanode启动的主流程中，启动了多种工作线程，包括InfoServer、JVMPauseMonitor、
BPServiceActor等。其中，最重要的是BPServiceActor线程，真正代表datanode与namenode通信的
正是BPServiceActor线程。
 //DataNode--initBlockPool()： 
  /**
* One of the Block Pools has successfully connected to its NN.
* This initializes the local storage for that block pool,
* checks consistency of the NN's cluster ID, etc.
*
* If this is the first block pool to register, this also initializes
* the datanode-scoped storage.
*
* @param bpos Block pool offer service
* @throws IOException if the NN is inconsistent with the local storage.
*/
  
  void initBlockPool(BPOfferService bpos) throws IOException {
...// 略
 // 将blockpool注册到BlockPoolManager
 blockPoolManager.addBlockPool(bpos);
 // 初步初始化存储结构
 initStorage(nsInfo);
...// 检查磁盘损坏
 // 启动扫描器
 initPeriodicScanners(conf);
 // 将blockpool添加到FsDatasetIpml，并继续初始化存储结构
 data.addBlockPool(nsInfo.getBlockPoolID(), conf);
}


   
   
   
   